﻿using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CRL.Core;
using CRL.Core.Extension;
using CRL.EventBus.Queue;
using Org.Apache.Rocketmq;
namespace CRL.EventBus.RocketMQ
{
    public class RocketMQConfig : ConfigBase
    {
        public string Server { get; set; }

        public string AccessKey { get; set; }

        public string SecretKey { get; set; }
        public string Topic { get; set; }
        public bool ConsumerByNameSingle { get; set; }
        /// <summary>
        /// 不可见持续时间
        /// </summary>
        public TimeSpan InvisibleDuration { get; set; } = TimeSpan.FromSeconds(10);
    }
    class RocketMQ : AbsQueue
    {
        // see https://rocketmq.apache.org/zh/docs/sdk/04csharp/#%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E
        ClientConfig clientConfig;
        RocketMQConfig _queueConfig;
        static Consumer errorQueueClient;
        string errorRepeatDelayQueue = "eventBusErrorSend";
        //List<Consumer> consumers = new List<Consumer>();
        static List<Consumer> consumers = new List<Consumer>();
        public override MQType MQType => MQType.RocketMQ;
        static RocketMQ()
        {
            AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
        }
        public RocketMQ(RocketMQConfig queueConfig)
        {
            _queueConfig = queueConfig;
            var credentialsProvider = new StaticSessionCredentialsProvider(queueConfig.AccessKey, queueConfig.SecretKey);
            string endpoints = queueConfig.Server;
            clientConfig = new ClientConfig.Builder()
                .SetEndpoints(endpoints)
                .SetCredentialsProvider(credentialsProvider)
                .Build();

            #region 异常重发队列
            if (errorQueueClient == null)
            {
                var subscription = new Dictionary<string, FilterExpression>();
                subscription.Add($"{_queueConfig.Topic}_delay", new FilterExpression(errorRepeatDelayQueue));
                var builder = new SimpleConsumer.Builder()
                    .SetClientConfig(clientConfig)
                    .SetConsumerGroup(_queueConfig.GroupName)
                    .SetAwaitDuration(TimeSpan.FromMinutes(1))
                    .SetSubscriptionExpression(subscription);
                var simpleConsumer = builder.Build().Result;
                consumers.Add(simpleConsumer);
                #region thread
                Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        try
                        {
                            var messageViews = simpleConsumer.Receive(50, TimeSpan.FromSeconds(15)).Result;
                            foreach (var m in messageViews)
                            {
                                var msgJson = Encoding.UTF8.GetString(m.Body);
                                var msgPackage = msgJson.ToObject<MsgPackage>();
                                PublishListBaseAsync(msgPackage.OriginRoutingKey, new List<MsgPackage> { new MsgPackage
                                {
                                    RoutingKey = msgPackage.OriginRoutingKey,
                                    _InnerData = msgPackage._InnerData,
                                    DelayMs = msgPackage.DelayMs,
                                    Priority = msgPackage.Priority,
                                    RetryTimes = msgPackage.RetryTimes,
                                    OriginRoutingKey = "",
                                    MsgKey=msgPackage.MsgKey,
                                    Time = msgPackage.Time
                                } }).Wait();
                                simpleConsumer.Ack(m);
                            }
                        }
                        catch (Exception ex)
                        {
                            var msg = $"simpleConsumer.Receive {ex}";
                            Console.WriteLine(msg);
                            //Log(eventDeclare, msg);
                        }
                    }
                });
                #endregion
            }
            #endregion
        }
        public override void Dispose()
        {

        }
        public override void PublishList(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            PublishListAsync(routingKey, msgs, optFunc).Wait();
        }
        public override async Task PublishListAsync(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            var innerMsgs = msgs.Select(b => new MsgPackage { _InnerData = b.ToJson(), RoutingKey = routingKey });
            await PublishListBaseAsync(routingKey, innerMsgs, optFunc);
        }
        async Task PublishListBaseAsync(string routingKey, IEnumerable<MsgPackage> msgs, Action<PublishOption> optFunc = null)
        {
            var opt = new PublishOption();
            optFunc?.Invoke(opt);
            var topic = _queueConfig.Topic;
            var builder2 = new Producer.Builder();
            builder2.SetTopics(topic);
            builder2.SetClientConfig(clientConfig);

            var producer = await builder2.Build();
            var topic2 = topic;
            if (opt.DelayMs > 0)
            {
                //rocketmq topic类型需要一致
                //TRANSACTION, FIFO, DELAY 或NORMAL
                topic2 = $"{topic2}_delay";
            }
            // Define your message body.
            foreach (var msg in msgs)
            {
                var bytes = Encoding.UTF8.GetBytes(msg.ToJson());
                string tag = routingKey;
                var builder = new Message.Builder()
                    .SetTopic(topic2)
                    .SetBody(bytes)
                    .SetTag(tag)
                    // You could set multiple keys for the single message actually.
                    .SetKeys(msg.Id);
                if (opt.DelayMs > 0)
                {
                    builder.SetDeliveryTimestamp(DateTime.Now + TimeSpan.FromMilliseconds(opt.DelayMs));
                }
                var message = builder.Build();
                var sendReceipt = await producer.Send(message);
            }
            await producer.DisposeAsync();
        }
        public override void DeleteData(IData data)
        {
            var msgPackage = data as MsgPackage;
            var ed = SubscribeService.GetEventDeclare(data.RoutingKey);
            Log(ed, $"重复发送已终止 {msgPackage.ToJson()}", $"{ed.Name}.delete");
            SubscribeService.OnEventDataRemove(msgPackage);
        }
        public override void RePublish(IData data, object msg)
        {
            //使用默认ack无法控制重发间隔，便会保持重发消息顺序性(确认？)
            //使用延迟队列解决了重发间隔问题，但无法保持重发后的顺序性
            var ed = SubscribeService.GetEventDeclare(data.RoutingKey);
            var attr = ed.SubscribeAttribute;
            //发送到延迟队列
            var delayMs = attr.GetDelayTime(data);
            if (delayMs < 0)
            {
                return;
            }
            PublishListBaseAsync(errorRepeatDelayQueue, new List<MsgPackage>
            {
                new MsgPackage{
                    _InnerData = msg.ToJson(),
                    OriginRoutingKey = data.RoutingKey,
                    RoutingKey = errorRepeatDelayQueue,
                    RetryTimes = data.RetryTimes + 1,
                    DelayMs = data.DelayMs,
                    Priority = data.Priority,
                    MsgKey = data.MsgKey,
                    Time = data.Time
                } }, opt =>
            {
                opt.DelayMs = delayMs;
            }).Wait();
        }

        public override void Subscribe(EventDeclare eventDeclare)
        {
            SubscribeAsyncBase(eventDeclare);
        }
        public override void SubscribeAsync(EventDeclare eventDeclare)
        {
            SubscribeAsyncBase(eventDeclare);
        }
        List<EventDeclare> eventDeclares = new List<EventDeclare>();
        void SubscribeAsyncBase(EventDeclare eventDeclare)
        {
            eventDeclares.Add(eventDeclare);
        }
        public override void ConfirmConsume()
        {
            //按单独tag或队列类型分组
            var group = eventDeclares.GroupBy(b => b.SubscribeAttribute.DelayQueue.ToString());
            if (_queueConfig.ConsumerByNameSingle)
            {
                group = eventDeclares.GroupBy(b => b.Name);
            }
            foreach (var g in group)
            {
                ConsumeBase(g.ToArray()).Wait();
            }
        }
        async Task ConsumeBase(EventDeclare[] eds)
        {
            string consumerGroup = _queueConfig.GroupName;
            //不同类型的队列需指定不同的topic
            string topic = _queueConfig.Topic;
            var eventDeclare = eds.First();
            if (eventDeclare.SubscribeAttribute.DelayQueue)
            {
                topic = $"{topic}_delay";
            }
            //var eds = g.ToArray();
            var subscription = new Dictionary<string, FilterExpression>();
            // In most case, you don't need to create too many consumers, single pattern is recommended.
            if (_queueConfig.ConsumerByNameSingle)//按单个tag订阅
            {
                subscription.Add(topic, new FilterExpression(eventDeclare.Name));
            }
            else
            {
                subscription.Add(topic, new FilterExpression("*"));
            }
            var s = subscription.First();
            Console.WriteLine($"consumer topic:{s.Key} tag:{s.Value.Expression}");
            var builder = new SimpleConsumer.Builder()
                .SetClientConfig(clientConfig)
                .SetConsumerGroup(consumerGroup)
                .SetAwaitDuration(TimeSpan.FromMinutes(1))
                .SetSubscriptionExpression(subscription);
            var simpleConsumer = await builder.Build();
            consumers.Add(simpleConsumer);
            #region thread
            await Task.Factory.StartNew(async () =>
            {
                while (true)
                {
                    try
                    {
                        var messageViews = await simpleConsumer.Receive(eventDeclare.SubscribeAttribute.BatchSize, _queueConfig.InvisibleDuration);
                        await OnConsumeAsync(simpleConsumer, messageViews);
                    }
                    catch (Exception ex)
                    {
                        var msg = $"simpleConsumer.Receive {ex}";
                        Console.WriteLine(msg);
                        //Log(eventDeclare, msg);
                    }
                }
            });
            #endregion
        }
        async Task<bool> OnConsumeAsync(SimpleConsumer consumer,
            IEnumerable<MessageView> msgExts)
        {
            //return true;
            //Console.WriteLine($"receive ConsumerId {q.ConsumerId}");
            var group = msgExts.GroupBy(b => b.Tag);
            foreach (var g in group)
            {
                var tag = g.Key;
                if (string.IsNullOrEmpty(tag))
                {
                    continue;
                }
                var messageViews = g.ToArray();
                var ed = SubscribeService.GetEventDeclare(tag);
                if (ed == null)
                {
                    continue;
                }
                await dataHandler(consumer,ed, messageViews);
            }
            return true;
        }
        async Task dataHandler(SimpleConsumer consumer,EventDeclare ed, MessageView[] messageViews)
        {
            var instance = ed.CreateServiceInstance();
            object getArgsObj(MessageView m, Type innerType, out MsgPackage msgPackage)
            {
                var msgJson = Encoding.UTF8.GetString(m.Body);
                msgPackage = msgJson.ToObject<MsgPackage>();
                var item = SerializeHelper.DeserializeFromJson(msgPackage._InnerData, innerType);
                return item;
            }
            if (ed.IsArray)
            {
                var innerType = ed.EventDataType.GenericTypeArguments[0];
                var objInstance = DynamicMethodHelper.CreateCtorFuncFromCache(ed.EventDataType)();
                var method = ed.EventDataType.GetMethod("Add");
                var msgsDic = new Dictionary<MessageView, Tuple<MsgPackage,object>>();
                foreach (var m in messageViews)
                {
                    var item = getArgsObj(m, innerType, out var pack);
                    method.Invoke(objInstance, new object[] { item });
                    msgsDic.Add(m, new Tuple<MsgPackage, object>(pack, item));
                }
                object result = true;
                try
                {
                    result = await getInvokeResult(ed, instance, objInstance);
                }
                catch (Exception ero)
                {
                    result = false;
                    Log(ed, ero.ToString());
                }
                var invokeSuccess = checkInvokeSuccess(result);
                foreach (var kv in msgsDic)
                {
                    CheckResult(invokeSuccess, kv.Value.Item1, ed, kv.Value.Item2);
                    await consumer.Ack(kv.Key);
                }
            }
            else
            {
                foreach (var m in messageViews)
                {
                    var item = getArgsObj(m, ed.EventDataType, out var msgPackage);
                    object result = true;
                    try
                    {
                        result = await getInvokeResult(ed, instance, item);
                    }
                    catch (Exception ero)
                    {
                        result = false;
                        Log(ed, ero.ToString());
                    }
                    var invokeSuccess = checkInvokeSuccess(result);
                    CheckResult(invokeSuccess, msgPackage, ed, item);
                    await consumer.Ack(m);
                }
            }
        }
    }
}